package com.jhf.youke.rocketmq.service;


import cn.hutool.json.JSONUtil;
import com.jhf.youke.rocketmq.config.RocketMqProducerConfiguration;
import com.jhf.youke.rocketmq.dto.RocketMqDto;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @author RHJ
 */
@Component
@Scope("prototype")
public class ProducerService {
    public static final Logger LOGGER = LoggerFactory.getLogger(ProducerService.class);
    private static final Integer MAX_SEND = 5;


    @Resource
    private RocketMqProducerConfiguration config;

    private DefaultMQProducer defaultMqProducer;

    public DefaultMQProducer init(String groupName){
        defaultMqProducer = new DefaultMQProducer(groupName+ UUID.randomUUID().toString());
        defaultMqProducer.setNamesrvAddr(config.getNameServiceAddr());
        defaultMqProducer.setMaxMessageSize(config.getMaxMessageSize());
        defaultMqProducer.setSendMsgTimeout(config.getSendMsgTimeout());
        defaultMqProducer.setVipChannelEnabled(false);
        return defaultMqProducer;
    }

    public void sendForString(String groupName, String topic, String tag, String message) throws Exception {
        DefaultMQProducer producer = init(groupName);
        try {

            producer.start();
            Message msg = new Message(topic,
                    tag,
                    message.getBytes(RemotingHelper.DEFAULT_CHARSET)
            );

             SendResult sendResult = producer.send(msg);
            LOGGER.info(sendResult.toString());
        }catch (Exception e){
            e.printStackTrace();
        }
        producer.shutdown();
    }


     /**
      * Description:  带重发机制
      * @author: RHJ
      * @date: 2021/1/6 0006 下午 1:45
      * @param:
      * @return:
      */
    public SendResult send(RocketMqDto rocketMq, int count) throws Exception {
        if(count <= MAX_SEND){
            SendResult result =  send(rocketMq);
            if(SendStatus.SEND_OK == result.getSendStatus()){
                return  result;
            }else{
                try {
                    Thread.sleep(30);
                }catch (Exception e){

                }
                send(rocketMq, count++);
            }
        }else{
            //发送错误达5次，写入发送错误日志中。
            try {

            }catch (Exception e) {

            }
        }
        return  null;
    }

    public SendResult send(RocketMqDto rocketMq) throws Exception {
        String groupName = rocketMq.getGroupName();
        String topic = rocketMq.getTopic();
        String tag = rocketMq.getTag();
        String json = JSONUtil.toJsonStr(rocketMq.getMessages());
        byte[] message = json.getBytes();
        DefaultMQProducer producer = init(groupName);

        producer.start();
        Message msg = new Message(topic,
                tag,
                message
        );
        SendResult result = producer.send(msg);

        LOGGER.info(result.toString());

        producer.shutdown();

        return  result;

    }
}
